package com.king.hessian;

import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.UndeclaredThrowableException;
import java.net.ConnectException;
import java.net.InetAddress;
import java.net.MalformedURLException;
import java.net.UnknownHostException;
import java.util.*;

import org.aopalliance.intercept.MethodInterceptor;
import org.aopalliance.intercept.MethodInvocation;
import org.apache.commons.lang.StringUtils;
import org.springframework.aop.framework.ProxyFactory;
import org.springframework.beans.factory.FactoryBean;
import org.springframework.remoting.RemoteAccessException;
import org.springframework.remoting.RemoteConnectFailureException;
import org.springframework.remoting.RemoteLookupFailureException;
import org.springframework.remoting.RemoteProxyFailureException;
import org.springframework.util.Assert;
import org.springframework.util.CollectionUtils;

import com.caucho.hessian.client.HessianConnectionFactory;
import com.caucho.hessian.client.HessianProxyFactory;
import com.caucho.hessian.client.HessianRuntimeException;
import com.caucho.hessian.io.SerializerFactory;
import com.qd.env.ServerMapper;
import com.king.notifier.Notifier;
import com.qd.util.UrlUtils;

/**
 * 创建Hessian的Client端的FactoryBean
 * <p/>
 * 简单改造了一下以支持轮询和运行期间切换
 */
public class HessianProxyFactoryBean extends UrlsBasedRemoteAccessor implements MethodInterceptor, FactoryBean, DynamicUrlSupport {

	public static final int DEFAULT_TIMEOUT = 1000;

	public static final String RPC_DEFAULT_TIMEOUT_KEY = "RPC_DEFAULT_TIMEOUT";

	public static final String HTTP_HEADER_HOST = "Host";

	private int failTimeout = 0;

	private int maxFail = 0;

	private final Random random = new Random();

	private HessianProxyFactory proxyFactory = new HessianProxyFactory();

	private volatile List<ProxyWrapper> hessianProxies;

	private Map<String, String> extraHeaders = new HashMap<>();

	private String host = null;

	private Object serviceProxy;

	private List<Notifier> notifiers = Collections.emptyList();

	public HessianProxyFactoryBean() {
		long defaultTimeout = toLong(System.getProperty(RPC_DEFAULT_TIMEOUT_KEY), DEFAULT_TIMEOUT);
		setReadTimeout(defaultTimeout);
		setOverloadEnabled(true);
	}

	@Override
	public List<String> getServiceUrls() {
		List<String> mappedServiceUrls = new ArrayList<>();
		List<String> serviceUrls = super.getServiceUrls();
		if (!CollectionUtils.isEmpty(serviceUrls)) {
			for (String url : serviceUrls) {
				mappedServiceUrls.addAll(ServerMapper.getInstance().getUrls(url, this));
			}
		}
		return mappedServiceUrls;
	}

	@Override
	public void setServiceUrls(List<String> serviceUrls) {
		host = UrlUtils.getHostsIfUnique(serviceUrls);
		super.setServiceUrls(serviceUrls);
	}

	public void setServiceUrl(String serviceUrl) {
		host = UrlUtils.getHostsIfUnique(serviceUrl);
		super.setServiceUrls(Arrays.asList(serviceUrl));
	}

	@Override
	public void afterPropertiesSet() {
		rebuildHessianProxies();
		MethodInterceptor interceptor = new CacheableMethodInterceptor(new EmptyIfEmptyMethodInterceptor(this));
		this.serviceProxy = new ProxyFactory(getServiceInterface(), interceptor).getProxy(getBeanClassLoader());
	}

	@Override
	public void refresh() {
		rebuildHessianProxies();
	}

	@Override
	public Object getObject() {
		return this.serviceProxy;
	}

	@Override
	public Class<?> getObjectType() {
		return getServiceInterface();
	}

	@Override
	public boolean isSingleton() {
		return true;
	}

	public void setProxyFactory(HessianProxyFactory proxyFactory) {
		this.proxyFactory = (proxyFactory != null ? proxyFactory : new HessianProxyFactory());
	}

	public void setSerializerFactory(SerializerFactory serializerFactory) {
		this.proxyFactory.setSerializerFactory(serializerFactory);
	}

	public void setSendCollectionType(boolean sendCollectionType) {
		this.proxyFactory.getSerializerFactory().setSendCollectionType(sendCollectionType);
	}

	public void setOverloadEnabled(boolean overloadEnabled) {
		this.proxyFactory.setOverloadEnabled(overloadEnabled);
	}

	public void setUsername(String username) {
		this.proxyFactory.setUser(username);
	}

	public void setPassword(String password) {
		this.proxyFactory.setPassword(password);
	}

	public void setDebug(boolean debug) {
		this.proxyFactory.setDebug(debug);
	}

	public void setChunkedPost(boolean chunkedPost) {
		this.proxyFactory.setChunkedPost(chunkedPost);
	}

	public void setReadTimeout(long timeout) {
		this.proxyFactory.setReadTimeout(timeout);
	}

	public void setHessian2(boolean hessian2) {
		this.proxyFactory.setHessian2Request(hessian2);
		this.proxyFactory.setHessian2Reply(hessian2);
	}

	public void setHessian2Request(boolean hessian2) {
		this.proxyFactory.setHessian2Request(hessian2);
	}

	public void setHessian2Reply(boolean hessian2) {
		this.proxyFactory.setHessian2Reply(hessian2);
	}

	public void setHessianConnectionFactory(HessianConnectionFactory hessianConnectionFactory) {
		this.proxyFactory.setConnectionFactory(hessianConnectionFactory);
	}

	public void setExtraHeaders(Map<String, String> extraHeaders) {
		this.extraHeaders = extraHeaders;
	}

	public void setFailTimeout(int failTimeout) {
		this.failTimeout = failTimeout;
	}

	public void setMaxFail(int maxFail) {
		this.maxFail = maxFail;
	}

	public void setNotifiers(List<Notifier> notifiers) {
		this.notifiers = notifiers;
	}

	public void rebuildHessianProxies() throws RemoteLookupFailureException {
		try {
			Map<String, String> extraHeaders = new HashMap<>(this.extraHeaders);
			extraHeaders.put(RpcClientInfo.HTTP_HEADER_X_RPC_CLIENT, RpcClientInfo.getServerName());
			if (host != null && !extraHeaders.containsKey(HTTP_HEADER_HOST)) {
				extraHeaders.put(HTTP_HEADER_HOST, host);
			}
			HttpClientHessianConnectionFactory hessianConnectionFactory = new HttpClientHessianConnectionFactory();
			hessianConnectionFactory.setHessianProxyFactory(this.proxyFactory);
			hessianConnectionFactory.setExtraHeaders(extraHeaders);
			setHessianConnectionFactory(hessianConnectionFactory);

			this.hessianProxies = createHessianProxy(this.proxyFactory, getServiceUrls());
		} catch (MalformedURLException ex) {
			throw new RemoteLookupFailureException("Service URL is invalid", ex);
		}
	}

	protected List<ProxyWrapper> createHessianProxy(HessianProxyFactory proxyFactory, List<String> serviceUrls)
			throws MalformedURLException {
		List<ProxyWrapper> hessianProxies = new ArrayList<>();
		for (String serviceUrl : serviceUrls) {
			ProxyWrapper hessianProxy = createHessianProxy(proxyFactory, serviceUrl);
			hessianProxies.add(hessianProxy);
		}
		return Collections.unmodifiableList(hessianProxies);
	}

	protected ProxyWrapper createHessianProxy(HessianProxyFactory proxyFactory, String serviceUrl) throws MalformedURLException {
		Assert.notNull(getServiceInterface(), "'serviceInterface' is required");
		Object resultProxy = proxyFactory.create(getServiceInterface(), serviceUrl);
		return new ProxyWrapper(resultProxy, failTimeout, maxFail);
	}

	@Override
	public Object invoke(MethodInvocation invocation) throws Throwable {
		List<ProxyWrapper> hessianProxies = this.hessianProxies;
		if (hessianProxies == null || hessianProxies.isEmpty()) {
			throw new IllegalStateException("HessianClientInterceptor is not properly initialized - "
					+ "invoke 'prepare' before attempting any operations");
		}

		ClassLoader originalClassLoader = overrideThreadContextClassLoader();

		ProxyWrapper proxyWrapper = null;
		try {
			final int i = random.nextInt(hessianProxies.size());
			proxyWrapper = hessianProxies.get(i);
			int index = i;
			while (proxyWrapper.isDown()) {
				notifyDown(proxyWrapper);
				index = (index + 1) % hessianProxies.size();
				if (index != i) {
					proxyWrapper = hessianProxies.get(index);
				} else {
					logger.error("all upstream of " + getServiceInterface().getName() + getServiceUrls() + " is down!");
					break;
				}
			}

			return invocation.getMethod().invoke(proxyWrapper.getProxy(), invocation.getArguments());
		} catch (InvocationTargetException ex) {
			if (ex.getTargetException() instanceof HessianRuntimeException) {
				HessianRuntimeException hre = (HessianRuntimeException) ex.getTargetException();
				Throwable rootCause = (hre.getRootCause() != null ? hre.getRootCause() : hre);
				notifyFail(invocation, proxyWrapper, rootCause);
				throw convertHessianAccessException(rootCause);
			} else if (ex.getTargetException() instanceof UndeclaredThrowableException) {
				UndeclaredThrowableException utex = (UndeclaredThrowableException) ex.getTargetException();
				notifyFail(invocation, proxyWrapper, utex);
				throw convertHessianAccessException(utex.getUndeclaredThrowable());
			}
			throw ex.getTargetException();
		} catch (Throwable ex) {
			throw new RemoteProxyFailureException("Failed to invoke Hessian proxy for remote service", ex);
		} finally {
			resetThreadContextClassLoader(originalClassLoader);
		}
	}

	private void notifyDown(ProxyWrapper proxyWrapper) {
		String ipStr = getLocalIpStr();
		if (StringUtils.isNotBlank(ipStr)) {
			ipStr += ipStr + "-call-";
		}
		String message = ipStr + proxyWrapper.getProxy().toString() + "-maybedown-" + proxyWrapper.getFailCount() + "-fail-" + failTimeout + "-seconds";
		logger.error(message);
		for (Notifier notifier : notifiers) {
			notifier.notify(proxyWrapper.getProxy().getClass().getCanonicalName(), message);
		}
	}

	private void notifyFail(MethodInvocation invocation, ProxyWrapper proxyWrapper, Throwable rootCause) {
		if (proxyWrapper != null) {
			if (invocation != null && invocation.getMethod() != null && invocation.getMethod().getName().contains("getUserLeastBillType")) {
				//nothing
			} else {
				proxyWrapper.fail();
			}

		}
		if (logger.isDebugEnabled()) {
			logger.debug("hessian request of proxy " + proxyWrapper.getProxy() + " failed!, method=" + invocation.getMethod().getName(), rootCause);
		}
	}

	protected RemoteAccessException convertHessianAccessException(Throwable ex) {
		if (ex instanceof ConnectException) {
			return new RemoteConnectFailureException("Cannot connect to Hessian remote service", ex);
		} else {
			return new RemoteAccessException("Cannot access Hessian remote service", ex);
		}
	}

	private long toLong(String str, long defaultValue) {
		if (str == null) {
			return defaultValue;
		}
		try {
			return Long.parseLong(str);
		} catch (NumberFormatException nfe) {
			return defaultValue;
		}
	}

	private String getLocalIpStr() {
		InetAddress addr;
		try {
			addr = InetAddress.getLocalHost();
			String ip = addr.getHostAddress();
			return ip;
		} catch (UnknownHostException e) {
			e.printStackTrace();
		}
		return "";
	}

}
